Давайте представим ситуацию: у вас поднят сервер, который слушает подключения на порту 8888.
И к вам подключается не 1, а сразу несколько клиентов.
Каждый из клиентов будет общаться с сервером (путем пересылки туда-обратно нескольких сообщений)
Вопрос - как нам их обрабатывать?
Перед тем, как отвечать на этот вопрос, давайте посмотрим, из чего состоит наше взаимодействие с клиентом
Ну и так далее...
Главное, то тут нужно уловить - Алиса много тупит
А сервер тратит много времени на ожидание
Возвращаемся к вопросу - как обрабатывать клиентов?
А давайте придумаем, как не тратить время на ожидание
Идея в том, чтоб во время ожидания одного клиента обрабатывать другого
Пока Алиса тупит, мы заняты и делаем полезную работу
Ну т.е. по сути 'алгоритм' сервера следующий:
// псевдокод
while (1) {
if (is_ready(alice_fd)) {
work_with(alice_fd);
} else if (is_ready(bob_fd)) {
work_with(bob_fd);
} else {
sleep();
}
}
Выглядит вроде ок, если клиентов 2. Если клиентов больше, работать с ними будет труднее.
Как вообще is_ready можно реализовать?)
%%writefile example1.c
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
int main() {
char buffer[123];
fcntl(0, F_SETFL, O_NONBLOCK); // Вся магия тут!)
int len = read(0, buffer, sizeof(buffer));
printf("%d\n", len);
return 0;
}
Overwriting example1.c
!gcc example1.c -o example1
!./example1
-1
Если вход будет пустой - будет -1
[danila@archlinux ~/caos/14-async & multithreading]
$ ./example1
-1
Если на вход подать строку - будет ожидаемое поведение
[danila@archlinux ~/caos/14-async & multithreading]
$ ./example1 <<EOF
heredoc> asdadsadadada
heredoc> EOF
14
Причем даже так будет -1))
[danila@archlinux ~/caos/14-async & multithreading]
$ cat | ./example1
-1
asdadsa
Применив флаг O_NONBLOCK мы сделали чтение неблокирующим
Для более удобной работы с несколькими fd было бы удобно иметь механизм по типу такого:
while (1) {
int ready_fd = get_ready_client();
work_with(ready_fd);
}
Как же классно, что такой механизм есть!)
NAME
epoll - I/O event notification facilitySYNOPSIS
#include <sys/epoll.h>DESCRIPTION
The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them. The epoll API can be used either as an edge-triggered or a level-trig‐ gered interface and scales well to large numbers of watched file descriptors.
Перевод простыми словами
epoll - это инструмент ядра для мониторинга файловых дескрипторов. Он вас проинформирует,
когда один из дескрипторов будет готов для ввода-вывода.
Давайте посмотрим, как с этим работать
%%writefile example2.c
#include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <signal.h>
#include <sched.h>
int exit_pipe[2];
void handler(int sig) {
char msg[] = "123";
write(exit_pipe[1], msg, sizeof(msg));
}
int main() {
pipe(exit_pipe);
dprintf(2, "signal\n");
struct sigaction sa = {
.sa_handler = handler,
.sa_flags = SA_RESTART
};
sigaction(SIGINT, &sa, NULL);
int epoll_fd = epoll_create1(0); // no flags
fcntl(exit_pipe[0], F_SETFL, O_NONBLOCK);
fcntl(0, F_SETFL, O_NONBLOCK);
int fifo_fd = open("test_fifo", O_RDONLY);
fcntl(fifo_fd, F_SETFL, O_NONBLOCK);
dprintf(2, "epoll_ctl...\n");
int all_fds[] = {exit_pipe[0], 0, fifo_fd};
for (int i = 0; i < 3; ++i) {
struct epoll_event event = {
.events = EPOLLIN,
.data.fd = all_fds[i]
};
// Будем ожидать готовности на чтение
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, all_fds[i], &event);
}
dprintf(2, "Main loop\n");
while (1) {
struct epoll_event event;
int N = epoll_wait(epoll_fd, &event, /*maxevents=*/ 1, /*timeout=*/ 1000);
if (N <= 0) {
sched_yield();
continue;
}
int fd = event.data.fd;
if (fd == exit_pipe[0]) {
break;
}
char buffer[1024];
size_t len = read(fd, buffer, sizeof(buffer));
write(1, buffer, len);
}
dprintf(2, "Closing all\n");
for (int i = 0; i < 3; ++i) {
close(all_fds[i]);
}
close(epoll_fd);
dprintf(2, "Exiting...\n");
return 0;
}
Overwriting example2.c
# term0
[danila@archlinux ~/caos/14-async & multithreading]
$ ./example2
signal
epoll_ctl...
Main loop
1234
1234
abc
^CClosing all
Exiting...
[danila@archlinux ~/caos/14-async & multithreading]
$
# term1
[danila@archlinux ~/caos/14-async & multithreading]
$ cat > test_fifo
abc
Что ещё важно подчеркнуть
Сори, она слишком простая (примера выше достаточно чтоб решить и нулевку, и единичку)
Кратенькая шутка про кассирш в КСП
|
|
|
(перед защитой вспомните, чем они от процессов отличаются)
В Си есть кросплатформенная реализация потоков - через библиотеку pthread
#include <pthread.h>
void *worker(void *args) {
/// ...
}
int main() {
pthread_t thread;
int my_int_arg;
pthread_create(
&thread,
NULL /*init and destroy thread attributes*/,
worker,
(void*)(&my_int_arg)
);
// ...
pthread_join(&thread, NULL /*retval*/);
}
Поскольку в задачах просят создавать ПОТОКИ, а не ПРОЦЕССЫ, используйте их))
Единственная сложность в многопоточке - сделать так, чтоб разные потоки/процессы друг другу не мешали при работе с одними и теми же данными
Одно из важнейших свойств инструкций (ощутите проблему) - атомарность
Атомарная операция/инструкция - операция, которая не может быть прервана. Она либо выполняется до конца, либо не выполняется вообще
Очевидный пример неатомарной операции
Атомарные операции
Атомарная инструкция или нет - определяется ещё при проектировании ПРОЦЕССОРА.
Чтоб простые разработчики (мы с вами) могли с этим как-то жить и работать, при портировании стандартных библиотек портируют различные примитивы - atomics, mutex, condvars, ...
(более подробно будет на курсе Ромы Липовского в следующем семе)
Atomic (атомик) - тип данных, поддерживающий атомарные операции над собой (остальное позже)
ВАЖНО ДЛЯ ПОНИМАНИЯ
Если вы вызываете атомарную функцию, это не значит, что ЭТА КОНКРЕТНАЯ СТРОЧКА атомарна
Пример cas-инструкции - lock-free добавление элемента в list)
// не атомарно
item_t *new_node = malloc(...);
do {
// тоже не атомарно
new_node->next = head;
// опять не атомарно, но где-то там есть что-то атомарное
} while (!atomic_compare_exchange_weak(&head, &new_node->next, (_Atomic item_t*)new_node));
(ниже ещё раз разберем этот пример)
(пока просто запомните, что атомики существуют)
Ладно, окей, а что если надо получать доступ к большим областям памяти?
Тут атомики не помогут
Да, но тут помогают примитивы синхронизации
Пример - два потока пытаются одновременно работать с каким-то разделяемым ресурсом (сокетом, файлом, пользователем, большим куском данных)
Чтоб защитить разделяемый ресурс используют mutex.
Два потока пытаются захватить mutex. Кто захватил mutex - тот и работает с ресурсом
Аналогия - флажок
Флажок один. Что угодно делай - он останется один единственный
Флажком может владеть либо какой-то один процесс (процесс держит флажок в руках), либо никто (флажок стоит на постаменте)
Пример
%%writefile example3.c
// without mutex
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int global_int;
void *worker(void *arg) {
int *gi = (int*)arg;
for (int i = 0; i < 1000; ++i) {
int old = *gi;
int new = old + 1;
*gi = new;
}
}
int main() {
global_int = 0;
pthread_t threads[2];
pthread_create(threads + 0, NULL, worker, (void*)(&global_int));
pthread_create(threads + 1, NULL, worker, (void*)(&global_int));
pthread_join(threads[0], NULL);
pthread_join(threads[1], NULL);
printf("%d\n", global_int);
return 0;
}
Overwriting example3.c
!gcc example3.c -o example3
!./example3
1980
Выше НЕ ВСЕГДА 2000
Иногда вам не повезет
Касательно слов всегда, иногда и прочих
Когда размышляете над многопоточным кодом, (в уме) доказывайте 2 вещи:
%%writefile example4.c
// with mutex
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t mutex;
int global_int;
void *worker(void *arg) {
int *gi = (int*)arg;
for (int i = 0; i < 1000; ++i) {
pthread_mutex_lock(&mutex);
int old = *gi;
int new = old + 1;
*gi = new;
pthread_mutex_unlock(&mutex);
}
}
int main() {
global_int = 0;
pthread_t threads[2];
pthread_create(threads + 0, NULL, worker, (void*)(&global_int));
pthread_create(threads + 1, NULL, worker, (void*)(&global_int));
pthread_join(threads[0], NULL);
pthread_join(threads[1], NULL);
printf("%d\n", global_int);
return 0;
}
Writing example4.c
!gcc example4.c -o example4
!./example4
2000
Код стал работать ДОЛЬШЕ, но стал гарантировать ответ 2000
Разгоняем телегу с глупыми и нет аналогиями))
Допустим вы работали себе, никого не трогали.
Делигировали какую-нибудь сложную задачу другому потоку (факторизация числа or smth).
Продолжили работать.
Внезапно дошли до момента, когда вам понадобился результат.
Вопросы: Как его забрать?) Как понять что результат готов? Как понять, что это новый результат, а не старый?
Вопрос со звездочкой: А что если вместе с вами кто-то ещё ждет результат этой же работы?
Хорошее решение - лечь спать и попросить разбудить)
Вот для решения такой проблемы служит condvar
Ещё что важно знать - condvar-у для работы нужен залоченый mutex
+-Очевидно, если помнить, что:
Пример:
%%writefile example5.c
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
typedef struct {
pthread_mutex_t *mutex;
pthread_cond_t *condvar;
int64_t *next_out;
} gen_args_t;
void *do_work(void *args) {
gen_args_t *params = (gen_args_t*)args;
for (int64_t i=0; ; ++i) {
pthread_mutex_lock(params->mutex);
*params->next_out = i;
pthread_cond_signal(params->condvar);
while (*params->next_out != -1)
pthread_cond_wait(params->condvar, params->mutex);
pthread_mutex_unlock(params->mutex);
}
}
int main(int argc, char **argv) {
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
int64_t next_out;
gen_args_t thread_args = {
.mutex = &mutex,
.condvar = &condvar,
.next_out = &next_out,
};
pthread_t worker;
pthread_create(&worker, NULL, do_work, &thread_args);
next_out = -1;
const int N = 10;
for (int32_t count = 0; count < N; ++count) {
pthread_mutex_lock(&mutex);
while (next_out == -1)
pthread_cond_wait(&condvar, &mutex);
int64_t cur_out = next_out;
next_out = -1;
pthread_mutex_unlock(&mutex);
if (count != N - 1)
pthread_cond_signal(&condvar);
printf("%lld\n", cur_out);
}
pthread_cancel(worker);
pthread_cond_destroy(&condvar);
pthread_mutex_destroy(&mutex);
return 0;
}
Overwriting example5.c
!gcc example5.c -o example5
!./example5
0 1 2 3 4 5 6 7 8 9
Сейчас давайте вернемся к понятию атомарных операций
Операция атомарна, если она не может быть прервана
Думаю полезнее будет обсудить, как написать mutex на псевдокоде
Основной инструмент - CAS
Compare And Save operation
CAS(atomic_ptr, atomic_expected, value_if_true, value_if_false) -> returns true on swap